package cn.edu360.report

import cn.edu360.beans.ReportMediaAnalysis
import cn.edu360.utils.{ConfigHandler, MySQLHandler, RptKpiTools}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 媒体分析 - Broadcast
  *  sheep.Old @ 64341393
  * Created 2018/5/9
  */
object RptMediaAnalysis {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[*]")
    sparkConf.setAppName("媒体分析")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
    // 字典数据广播出去
    val appdictMap: Map[String, String] = sc.textFile(ConfigHandler.appDictPath)
      .map(line => line.split("\t", -1))
      .filter(_.length >= 5)
      .map(t => (t(4), t(1))).collect().toMap
    val appdictBT: Broadcast[Map[String, String]] = sc.broadcast(appdictMap)
    val sQLContext = new SQLContext(sc)
    // 读取数据
    val rawDataFrame = sQLContext.read.parquet(ConfigHandler.parquetPath)
    import sQLContext.implicits._
    val result: DataFrame = rawDataFrame.filter("appid!=null or appid!='' or appname!=null or appname!=''")
      .map(t => {
        val appId = t.getAs[String]("appid")
        val appName = t.getAs[String]("appname")
        if (StringUtils.isEmpty(appName)) {
          val appName = appdictBT.value.getOrElse(appId, appId)
        }
        (appName, RptKpiTools.offLineKpi(t))
      }).reduceByKey {
      (list1, list2) => list1 zip list2 map (tp => tp._1 + tp._2)
    }.map(t => ReportMediaAnalysis(t._1, t._2(0), t._2(1), t._2(2), t._2(3), t._2(4), t._2(5), t._2(6), t._2(7), t._2(8)))
      .toDF()

    MySQLHandler.save2db(result, ConfigHandler.mediaAnalysisTableName)
    sc.stop()
  }
}
